In [1]:
import numpy as np
import time
from scipy.stats import chi2
from matplotlib import pyplot as plt
from __future__ import division
from itertools import product
from CAs import *
from Local_Complexity import *
%matplotlib inline
In [2]:
class pure_PLCs(object):
'''
Object for collection of past light cones with given depth, propogation speed, and alphabet size.
Vectorized version.
'''
def __init__(self, depth, propogation_speed, alphabet_size):
'''
Initializes PLCs object.
Parameters
----------
depth: int
Desired depth of past light cones. Depth = 1 is present configuration. Depth = 2 goes one time step
into the past, etc.
propogation_speed: int
Speed of information propogation in spacetime system.
alphabet_size: int
Number of distinct possible symbols at each location in spactime system.
'''
self._all_configs = []
self._all_labels = None
self._d = depth
self._c = propogation_speed
self._A = alphabet_size
self._size = lightcone_size(depth, propogation_speed) - 1
self._label = base10_convert(self._A, self._size)
def scan_data(self, data):
'''
Scans input data to collect past light cones.
Parameters
----------
data: array_like
Spacetime data for which past light cones are to be gathered. Time must run in
vertical direction.
'''
rows, columns = np.shape(data)
spacetime = np.copy(data)
#initialize ndarray for light cone labels
self._all_labels = np.zeros((rows, columns), dtype = int)
#reset array for ligth cone configurations so there is no error when scanning new data
self._all_configs = []
#algorithm to collect field of plc configs and labels
#run loop over time
for i in xrange(rows):
#create array of light cone configurations at current time
current_time = []
#skip if there is not enough past to get full depth light cone
if (i + 1) < self._d:
current_time = ['n/a']*columns
self._all_labels[i] = np.zeros(columns, dtype = int)
else:
#run loop to get light cones for each coordinate (i,j) in spatial configuration at given time
for j in xrange(columns):
add = ''
#run loop through light cone depth
for d in xrange(self._d):
#width of light cone at depth d
window_size = 2*self._c*d + 1
for w in xrange(window_size):
# run from left to right at each step back to get cannonical ordering for light cone
# vector
a = - self._c*d + w
add += str(spacetime[(i-d, (j+a)%columns)])
#cut out the present from the light cone
add = add[1:]
current_time += [add]
#convert configs to labels and add populated plc label array from current time to full label array
self._all_labels[i] = self._label[zip(*current_time)]
#add populated array of light cones from current time to full array
self._all_configs += [current_time]
def all_configs(self):
'''
Returns field of past light cone configuration vectors.
Returns
-------
configs: ndarray
Spacetime field of past light cone configuration vectors.
'''
return self._all_configs
def all_labels(self):
'''
Returns field of past light cone integer labels.
Returns
-------
labels: ndarray
Spacetime field of past light cone integer labels.
'''
return self._all_labels
def get_label(self, (t, x)):
'''
Returns integer label for past light cone at position (x,t).
Parameters
----------
(t,x): tuple
Integer coordinates for spactime position (x,t) for which light cone is to be retrieved.
Returns
-------
label: int
Past light cone integer label for light cone at position (x,t).
'''
return self._all_labels[t][x]
def get_config_str(self, (t, x)):
'''
Returns configuration vector of past light cone at position (x,t) as a string.
Parameters
----------
position: tuple
Spacetime position vector ordered as (t,x).
Returns
-------
config: str
Configuration vector of past light cone at position (x,t). Ordered from bottom to top, reading
left to right at each level.
'''
config_string = self._all_configs[t][x]
return config_string
def map_to_label(self, configuration):
'''
Converts past light cone configuration vector with the PLCs object parameters to the
corresponding integer label.
Parameters
----------
configuration: array_like
Array of past light cone configuration vector.
Returns
-------
label: int
Integer label corresponding to input past light cone configuration vector.
'''
return self._label[tuple(configuration)]
def map_to_config(self, label):
'''
Converts integer label to configuration vector with the PLCs object parameters.
Parameters
----------
label: int
Integer label for past light cone.
Returns
-------
config: str
Past light cone configuration vector associated with given integer label.
'''
return label2config(label, self._A, self._size)
def number_of(self):
'''
Returns the number of possible distinct past light cone configurations with the PLCs object parameters,
depth, propogation speed, and alphabet size.
Returns
-------
number: int
Number of possible past light cone configurations.
'''
return self._A**(self._size)
In [3]:
class pure_FLCs(object):
'''
Object for collection of future light cones with given depth, propogation speed, and slphabet size.
'''
def __init__(self, depth, propogation_speed, alphabet_size):
'''
Instantiates future light cone object.
Parameters
----------
data: array-like
Input data from which future light cones are to be taken.
depth: int
Depth of the light cones, i.e. how are into the futre the light cones reach. Depth = 1 is just
the spacetime time value at that point, i.e. does not reach into the past. Depth = 2 reaches one
time step into the past, etc.
propogation_speed: int
Speed at which information propogates throught the spacetime data.
alphabet_size: int
Number of distinct symbols present (or could be present) in the data. Default None. If None,
alphabet_size is the number of distinct symbols actually present in the input data.
'''
self._all_configs = []
self._all_labels = None
self._d = depth
self._c = propogation_speed
self._A = alphabet_size
self._size = lightcone_size(depth, propogation_speed) - 1
self._label = base10_convert(self._A, self._size)
def scan_data(self, data):
'''
Scans input data to collect future light cones.
Parameters
----------
data: array_like
Spacetime data for which future light cones are to be gathered. Time must run in
vertical direction.
'''
rows, columns = np.shape(data)
spacetime = np.copy(data)
#initialize ndarray for light cone labels
self._all_labels = np.zeros((rows, columns), dtype = int)
#reset configuration array so there is no error when scanning new data
self._all_configs = []
#algorithm to collect field of flc configs and labels
#run loop over time
for i in xrange(rows):
#create array of light cones at current time
current_time = []
#skip if there is not enough past to get full depth light cone
if (rows - i) < self._d:
current_time = ['n/a']*columns
self._all_labels[i] = np.zeros(columns, dtype = int)
else:
#run loop to get light cones for each coordinate (i,j) in spatial configuration at given time
for j in xrange(columns):
add = ''
#run loop through light cone depth
for d in xrange(self._d):
#width of light cone at depth d
window_size = 2*self._c*d + 1
for w in xrange(window_size):
# run from left to right at each step back to get cannonical ordering for light cone
# vector
a = - self._c*d + w
add += str(spacetime[(i+d, (j+a)%columns)] )
#cut out the present from the light cone
add = add[1:]
current_time += [add]
#convert configs to labels and add populated plc label array from current time to full label array
self._all_labels[i] = self._label[zip(*current_time)]
#add populated array of light cones from current time to full array
self._all_configs += [current_time]
def all_configs(self):
'''
Returns field of future light cone configuration vectors.
Returns
-------
configs: ndarray
Spacetime field of future light cone configuration vectors.
'''
return self._all_configs
def all_labels(self):
'''
Returns field of future light cone integer labels.
Returns
-------
labels: ndarray
Spacetime field of future light cone integer labels.
'''
return self._all_labels
def get_label(self, (t, x)):
'''
Returns integer label for future light cone configuration at position (x,t).
Parameters
----------
(t,x): tuple of integers
Spacetime coordinates (x,t) for point at which future light cone label is to be retrieved.
Returns
-------
label: int
Integer label for future light cone configuration at position (x,t).
'''
return self._all_labels[t][x]
def get_config_str(self, (t, x)):
'''
Returns configuration vector of future light cone at position (x,t) as a string.
Parameters
----------
position: tuple
Spacetime position vector ordered as (t,x).
Returns
-------
config: str
Configuration vector of future light cone at position (x,t). Ordered from top to bottom (present to future),
reading left to right at each level.
'''
return self._all_configs[t][x]
def map_to_label(self, configuration):
'''
Converts future light cone configuration vector with the FLCs object parameters to the
corresponding integer label.
Parameters
----------
configuration: array_like
Array of future light cone configuration vector.
Returns
-------
label: int
Integer label corresponding to input future light cone configuration vector.
'''
return self._label[tuple(configuration)]
def map_to_config(self, label):
'''
Converts integer label to configuration vector with the FLCs object parameters.
Parameters
----------
label: int
Integer label for future light cone.
Returns
-------
config: str
Future light cone configuration vector associated with given integer label.
'''
return label2config(label, self._A, self._size)
def number_of(self):
'''
Returns the number of possible distinct future light cone configurations with the FLCs object parameters,
depth, propogation speed, and alphabet size.
Returns
-------
number: int
Number of possible future light cone configurations.
'''
return self._A**(self._size)
In [18]:
lc_test = np.random.choice([0,1],(10,10))
print lc_test
In [19]:
plcs = pure_PLCs(3, 1, 2)
plcs.scan_data(lc_test)
label = plcs.all_labels()[4][2]
config = plcs.all_configs()[4][2]
print label
print config
print plcs.map_to_label(config)
print plcs.map_to_config(label)
print plcs.get_config_str((4,2))
print plcs.get_label((4,2))
In [20]:
flcs = pure_FLCs(3, 1, 2)
flcs.scan_data(lc_test)
Flabel = flcs.all_labels()[4][2]
Fconfig = flcs.all_configs()[4][2]
print Flabel
print Fconfig
print flcs.map_to_label(Fconfig)
print flcs.map_to_config(Flabel)
print flcs.get_label((4,2))
print flcs.get_config_str((4,2))
In [ ]:
In [ ]:
In [4]:
class information_field(object):
'''
Partitioning of a process into past, present, and futuer done locally over a spacetime field using light cones for
pasts and futures at each point.
'''
def __init__(self, data, past_depth, future_depth, propogation_speed, alphabet_size = None):
'''
'''
if alphabet_size is None:
self._A = np.size(np.unique(data))
else:
self._A = alphabet_size
self._time, self._space = np.shape(data)
self._spacetime = data
self._past_depth = past_depth
self._future_depth = future_depth
self._c = propogation_speed
self._plcs = pure_PLCs(self._past_depth, self._c, self._A)
self._plcs.scan_data(data)
self._flcs = pure_FLCs(self._future_depth, self._c, self._A)
self._flcs.scan_data(data)
self._present = np.copy(self._spacetime[self._past_depth-1 : self._time-self._future_depth+1])
self._past = self._plcs.all_configs()[self._past_depth - 1 : self._time-self._future_depth+1]
self._future = self._flcs.all_configs()[self._past_depth - 1 : self._time - self._future_depth+1]
self._rows, self._columns = np.shape(self._present)
self._past_probs = {}
self._present_probs = {}
self._future_probs = {}
self._full_process_probs = {}
self._past_present_probs = {}
self._past_future_probs = {}
self._present_future_probs = {}
#local values of b_mu
self._b = {}
#local values of r_mu
self._r = {}
#local values of h_mu
self._h = {}
#local values of q_mu
self._q = {}
#local values of sigma_mu
self._sigma = {}
def present(self):
return np.array(self._present)
def past(self):
return np.array(self._past)
def future(self):
return np.array(self._future)
def full_process(self):
'''
'''
process = []
for i in xrange(self._rows):
current=[]
for j in xrange(self._columns):
current.append(str(self._past[i][j])+str(self._present[i][j])+str(self._future[i][j]))
process += [current]
return np.array(process)
def past_present(self):
'''
'''
process = []
for i in xrange(self._rows):
current = []
for j in xrange(self._columns):
current.append(str(self._past[i][j])+str(self._present[i][j]))
process += [current]
return np.array(process)
def past_future(self):
'''
'''
process = []
for i in xrange(self._rows):
current = []
for j in xrange(self._columns):
current.append(str(self._past[i][j])+str(self._future[i][j]))
process += [current]
return np.array(process)
def present_future(self):
'''
'''
process = []
for i in xrange(self._rows):
current = []
for j in xrange(self._columns):
current.append(str(self._present[i][j])+str(self._future[i][j]))
process += [current]
return np.array(process)
def full_process_probs(self):
if len(self._full_process_probs) == 0:
states, counts = np.unique(self.full_process(),False,False,True)
Z = np.sum(counts)
for state, count in zip(states, counts):
self._full_process_probs.update({state : count/Z})
return self._full_process_probs
def past_probs(self):
if len(self._past_probs) == 0:
states, counts = np.unique(self.past(),False,False,True)
Z = np.sum(counts)
for state, count in zip(states, counts):
self._past_probs.update({state : count/Z})
return self._past_probs
def present_probs(self):
if len(self._present_probs) == 0:
states, counts = np.unique(self.present(),False,False,True)
Z = np.sum(counts)
for state, count in zip(states, counts):
self._present_probs.update({str(state) : count/Z})
return self._present_probs
def future_probs(self):
if len(self._future_probs) == 0:
states, counts = np.unique(self.future(),False,False,True)
Z = np.sum(counts)
for state, count in zip(states, counts):
self._future_probs.update({state : count/Z})
return self._future_probs
def past_present_probs(self):
if len(self._past_present_probs) == 0:
states, counts = np.unique(self.past_present(),False,False,True)
Z = np.sum(counts)
for state, count in zip(states, counts):
self._past_present_probs.update({state : count/Z})
return self._past_present_probs
def past_future_probs(self):
if len(self._past_future_probs) == 0:
states, counts = np.unique(self.past_future(),False,False,True)
Z = np.sum(counts)
for state, count in zip(states, counts):
self._past_future_probs.update({state : count/Z})
return self._past_future_probs
def present_future_probs(self):
if len(self._present_future_probs) == 0:
states, counts = np.unique(self.present_future(),False,False,True)
Z = np.sum(counts)
for state, count in zip(states, counts):
self._present_future_probs.update({state : count/Z})
return self._present_future_probs
def splitter(self, joint, choice):
'''
'''
return joint_splitter(joint, self._past_depth, self._c, choice)
def b_mu_slow(self):
'''
Mutual information between the past light cone and present, given the future light cone.
'''
b = 0
pasts = np.unique(self.past())
presents = np.unique(self.present())
futures = np.unique(self.future())
for l_min in pasts:
for l in presents:
for l_plus in futures:
joint = str(l_min)+str(l)+str(l_plus)
if joint not in self.full_process():
pass
else:
b += self.full_process_probs()[joint] * np.log((self.full_process_probs()[joint]*self.future_probs()[str(l_plus)]) / \
(self.past_future_probs()[str(l_min)+str(l_plus)]*self.present_future_probs()[str(l)+str(l_plus)]))
return b / np.log(2)
def b_mu(self):
'''
Mutual information between the past light cone and present, given the future light cone.
'''
b = 0
joints = np.unique(self.full_process())
for joint in joints:
l_min = joint_splitter(joint, self._past_depth, self._c, 'past')
l = joint_splitter(joint, self._past_depth, self._c, 'present')
l_plus = joint_splitter(joint, self._past_depth, self._c, 'future')
b += self.full_process_probs()[joint] * np.log((self.full_process_probs()[joint]*self.future_probs()[str(l_plus)]) / \
(self.past_future_probs()[str(l_min)+str(l_plus)]*self.present_future_probs()[str(l)+str(l_plus)]))
return b / np.log(2)
def local_b_mu_slow(self):
'''
'''
if len(self._b) == 0:
pasts = np.unique(self.past())
presents = np.unique(self.present())
futures = np.unique(self.future())
for l_min in pasts:
for l in presents:
for l_plus in futures:
joint = str(l_min)+str(l)+str(l_plus)
if joint not in self.full_process():
pass
else:
self._b.update({joint : np.log((self.full_process_probs()[joint]*self.future_probs()[str(l_plus)]) / \
(self.past_future_probs()[str(l_min)+str(l_plus)]*self.present_future_probs()[str(l)+str(l_plus)])) / np.log(2)})
return self._b
def local_b_mu(self):
'''
'''
if len(self._b) == 0:
joints = np.unique(self.full_process())
for joint in joints:
l_min = joint_splitter(joint, self._past_depth, self._c, 'past')
l = joint_splitter(joint, self._past_depth, self._c, 'present')
l_plus = joint_splitter(joint, self._past_depth, self._c, 'future')
self._b.update({joint : np.log((self.full_process_probs()[joint]*self.future_probs()[str(l_plus)]) / \
(self.past_future_probs()[str(l_min)+str(l_plus)]*self.present_future_probs()[str(l)+str(l_plus)])) / np.log(2)})
return self._b
def r_mu(self):
'''
'''
return r
def local_r_mu(self):
'''
'''
if len(self._r) == 0:
'calculate'
return self._r
def h_mu_slow(self):
'''
Joint is past-present.
'''
h = 0
pasts = np.unique(self.past())
presents = np.unique(self.present())
for l_min in pasts:
for l in presents:
joint = str(l_min)+str(l)
if joint not in self.past_present():
pass
else:
h += -self.past_present_probs()[joint] * np.log(self.past_present_probs()[joint]/self.past_probs()[l_min])
return h / np.log(2)
def h_mu(self):
'''
'''
h = 0
joints = np.unique(self.past_present())
for joint in joints:
l_min = joint[:len(joint)-1]
l = joint[len(joint)-1]
h += -self.past_present_probs()[joint] * np.log(self.past_present_probs()[joint]/self.past_probs()[l_min])
return h / np.log(2)
def local_h_mu_slow(self):
'''
Joint is past-present. Local h_mu dictionary maps past_present values to their pointwise h_mu.
'''
if len(self._h) == 0:
pasts = np.unique(self.past())
presents = np.unique(self.present())
for l_min in pasts:
for l in presents:
joint = str(l_min)+str(l)
if joint not in self.past_present():
pass
else:
self._h.update({joint : -np.log(self.past_present_probs()[joint]/self.past_probs()[l_min]) / np.log(2)})
return self._h
def local_h_mu(self):
'''
'''
if len(self._h) == 0:
joints = np.unique(self.past_present())
for joint in joints:
l_min = joint[:len(joint)-1]
l = joint[len(joint)-1]
self._h.update({joint : -np.log(self.past_present_probs()[joint]/self.past_probs()[l_min]) / np.log(2)})
return self._h
def sigma_mu_slow(self):
'''
Mutual information between past and future light cones, given the present.
'''
s = 0
pasts = np.unique(self.past())
presents = np.unique(self.present())
futures = np.unique(self.future())
for l_min in pasts:
for l in presents:
for l_plus in futures:
joint = str(l_min)+str(l)+str(l_plus)
if joint not in self.full_process():
pass
else:
s += self.full_process_probs()[joint] * np.log((self.full_process_probs()[joint]*self.present_probs()[str(l)]) / \
(self.past_present_probs()[str(l_min)+str(l)]*self.present_future_probs()[str(l)+str(l_plus)]))
return s / np.log(2)
def sigma_mu(self):
'''
'''
s = 0
joints = np.unique(self.full_process())
for joint in joints:
l_min = self.splitter(joint, 'past')
l = self.splitter(joint, 'present')
l_plus = self.splitter(joint, 'future')
s += self.full_process_probs()[joint] * np.log((self.full_process_probs()[joint]*self.present_probs()[str(l)]) / \
(self.past_present_probs()[str(l_min)+str(l)]*self.present_future_probs()[str(l)+str(l_plus)]))
return s / np.log(2)
def local_sigma_mu_slow(self):
'''
'''
if len(self._sigma) == 0:
pasts = np.unique(self.past())
presents = np.unique(self.present())
futures = np.unique(self.future())
for l_min in pasts:
for l in presents:
for l_plus in futures:
joint = str(l_min)+str(l)+str(l_plus)
if joint not in self.full_process():
pass
else:
self._sigma.update({joint : np.log((self.full_process_probs()[joint]*self.present_probs()[str(l)]) / \
(self.past_present_probs()[str(l_min)+str(l)]*self.present_future_probs()[str(l)+str(l_plus)])) / np.log(2)})
return self._sigma
def local_sigma_mu(self):
'''
'''
if len(self._sigma) == 0:
joints = np.unique(self.full_process())
for joint in joints:
l_min = self.splitter(joint, 'past')
l = self.splitter(joint, 'present')
l_plus = self.splitter(joint, 'future')
self._sigma.update({joint : np.log((self.full_process_probs()[joint]*self.present_probs()[str(l)]) / \
(self.past_present_probs()[str(l_min)+str(l)]*self.present_future_probs()[str(l)+str(l_plus)])) / np.log(2)})
return self._sigma
def q_mu_slow(self):
'''
Three-way mutual information between past light cone, present, and future light cone.
'''
q = 0
pasts = np.unique(self.past())
presents = np.unique(self.present())
futures = np.unique(self.future())
for l_min in pasts:
for l in presents:
for l_plus in futures:
joint = str(l_min)+str(l)+str(l_plus)
if joint not in self.full_process():
pass
else:
q += self.full_process_probs()[joint] * np.log((self.past_present_probs()[str(l_min)+str(l)]*self.past_future_probs()[str(l_min)+str(l_plus)]*self.present_future_probs()[str(l)+str(l_plus)]) / \
(self.full_process_probs()[joint]*self.present_probs()[str(l)]*self.past_probs()[str(l_min)]*self.future_probs()[str(l_plus)]))
return q / np.log(2)
def q_mu(self):
'''
'''
q = 0
joints = np.unique(self.full_process())
for joint in joints:
l_min = self.splitter(joint, 'past')
l = self.splitter(joint, 'present')
l_plus = self.splitter(joint, 'future')
q += self.full_process_probs()[joint] * np.log((self.past_present_probs()[str(l_min)+str(l)]*self.past_future_probs()[str(l_min)+str(l_plus)]*self.present_future_probs()[str(l)+str(l_plus)]) / \
(self.full_process_probs()[joint]*self.present_probs()[str(l)]*self.past_probs()[str(l_min)]*self.future_probs()[str(l_plus)]))
return q / np.log(2)
def local_q_mu_slow(self):
'''
'''
if len(self._q) == 0:
pasts = np.unique(self.past())
presents = np.unique(self.present())
futures = np.unique(self.future())
for l_min in pasts:
for l in presents:
for l_plus in futures:
joint = str(l_min)+str(l)+str(l_plus)
if joint not in self.full_process():
pass
else:
self._q.update({joint : np.log((self.past_present_probs()[str(l_min)+str(l)]*self.past_future_probs()[str(l_min)+str(l_plus)]*self.present_future_probs()[str(l)+str(l_plus)]) / \
(self.full_process_probs()[joint]*self.present_probs()[str(l)]*self.past_probs()[str(l_min)]*self.future_probs()[str(l_plus)]))/np.log(2)})
return self._q
def local_q_mu(self):
'''
'''
if len(self._q) == 0:
joints = np.unique(self.full_process())
for joint in joints:
l_min = self.splitter(joint, 'past')
l = self.splitter(joint, 'present')
l_plus = self.splitter(joint, 'future')
self._q.update({joint : np.log((self.past_present_probs()[str(l_min)+str(l)]*self.past_future_probs()[str(l_min)+str(l_plus)]*self.present_future_probs()[str(l)+str(l_plus)]) / \
(self.full_process_probs()[joint]*self.present_probs()[str(l)]*self.past_probs()[str(l_min)]*self.future_probs()[str(l_plus)]))/np.log(2)})
return self._q
def b_field(self):
'''
Field of local (pointwise) values of b_mu
'''
field = np.zeros((self._rows, self._columns))
for i in xrange(self._rows):
for j in xrange(self._columns):
field[i][j] = self.local_b_mu()[self.full_process()[i][j]]
return field
def b_diagram(self, size = 15, colors = plt.cm.Greys):
'''
Plots the field of local (pointwise) values of b_mu
'''
plt.figure(figsize = (size, size))
plt.imshow(self.b_field(), cmap=colors, interpolation='nearest')
ptl.colorbar()
plt.show()
def h_field(self):
'''
Field of local (pointwise) values of h_mu
'''
field = np.zeros((self._rows, self._columns))
for i in xrange(self._rows):
for j in xrange(self._columns):
field[i][j] = self.local_h_mu()[self.past_present()[i][j]]
return field
def h_diagram(self, size = 15, colors = plt.cm.Greys):
'''
Plots the field of local (pointwise) values of h_mu
'''
plt.figure(figsize = (size, size))
plt.imshow(self.h_field(), cmap=colors, interpolation='nearest')
plt.colorbar()
plt.show()
def sigma_field(self):
'''
Field of local (pointwise) values of sigma_mu
'''
field = np.zeros((self._rows, self._columns))
for i in xrange(self._rows):
for j in xrange(self._columns):
field[i][j] = self.local_sigma_mu()[self.full_process()[i][j]]
return field
def sigma_diagram(self, size = 15, colors = plt.cm.Greys):
'''
Plots the field of local (pointwise) values of sigma_mu
'''
plt.figure(figsize = (size, size))
plt.imshow(self.sigma_field(), cmap=colors, interpolation='nearest')
plt.colorbar()
plt.show()
def q_field(self):
'''
Field of local (pointwise) values of q_mu
'''
field = np.zeros((self._rows, self._columns))
for i in xrange(self._rows):
for j in xrange(self._columns):
field[i][j] = self.local_q_mu()[self.full_process()[i][j]]
return field
def q_diagram(self, size = 15, colors = plt.cm.Greys):
'''
Plots the field of local (pointwise) values of q_mu
'''
plt.figure(figsize = (size, size))
plt.imshow(self.q_field(), cmap=colors, interpolation='nearest')
plt.colorbar()
plt.show()
def r_field(self):
'''
Field of local (pointwise) values of r_mu
'''
field = np.zeros((self._rows, self._columns))
for i in xrange(self._rows):
for j in xrange(self._columns):
field[i][j] = self.local_r_mu()[self.full_process()[i][j]]
return field
def r_diagram(self, size = 15, colors = plt.cm.Greys):
'''
Plots the field of local (pointwise) values of r_mu
'''
plt.figure(figsize = (size, size))
plt.imshow(self.r_field(), cmap=colors, interpolation='nearest')
plt.colorbar()
plt.show()
In [5]:
def joint_splitter(joint_config, past_depth, c, choice):
joint = joint_config
past = lightcone_size(past_depth, c) - 1
if choice == 'past':
return joint[:past]
elif choice == 'present':
return joint[past]
elif choice == 'future':
return joint[past+1:]
elif choice == 'past-present':
return joint[:past+1]
elif choice == 'past-future':
return str(joint[:past])+str(joint[past+1:])
elif choice == 'present-future':
return joint[past:]
else:
print 'Invalid partition choice'
return None
In [8]:
test_field = np.zeros((10,10), dtype = int)
fill = np.ones((2,10),dtype = int)
test_field = np.vstack((fill,test_field,fill))
time, space = np.shape(test_field)
test_field[2]+=2
test_field[11]+=2
print test_field
In [55]:
test = information_field(test_field,3,2,1)
In [9]:
print test.past()
In [10]:
print test.present()
In [11]:
print test.future()
In [ ]:
In [44]:
print test.h_mu()
print test.h_mu_slow()
print np.mean(test.h_field())
In [30]:
test.h_field()
Out[30]:
In [51]:
print test.b_mu()
print test.b_mu_slow()
print np.mean(test.b_field())
In [33]:
test.b_field()
Out[33]:
In [52]:
print test.q_mu()
print test.q_mu_slow()
print np.mean(test.q_field())
In [37]:
test.q_field()
Out[37]:
In [56]:
print test.sigma_mu()
print test.sigma_mu_slow()
print np.mean(test.sigma_field())
In [38]:
np.mean(test.sigma_field())
Out[38]:
In [40]:
test.sigma_field()
Out[40]:
In [ ]:
In [ ]:
In [63]:
dom_54 = ECA(54,domain_54(15*4,'a'))
dom_54.evolve(15*4)
dom_54.diagram()
In [66]:
info_54 = information_field(dom_54.get_spacetime(),3,3,1)
In [14]:
info_54.b_mu()
Out[14]:
In [56]:
start = time.clock()
info_54.b_diagram()
end = time.clock()
print 'elapsed time: ', (end-start)/60
In [68]:
import time
start = time.clock()
print info_54.h_field()
end = time.clock()
print 'elapsed time: ', (end-start)/60
In [65]:
print info_54.q_mu()
In [66]:
print info_54.sigma_mu()
In [67]:
info_54.q_diagram()
In [68]:
info_54.sigma_diagram()
In [ ]:
In [57]:
rule_54=ECA(54, random_state(200,2))
rule_54.evolve(200)
rule_54.diagram()
In [16]:
test_54 = information_field(rule_54.get_spacetime(),3,3,1)
In [18]:
import time
start = time.clock()
print test_54.b_mu()
end = time.clock()
print "elapsed time: ", (end-start)/60
In [19]:
import time
start = time.clock()
print test_54.b_mu_fast()
end = time.clock()
print "elapsed time: ", (end-start)/60
In [58]:
import time
start = time.clock()
test_54.h_diagram()
end = time.clock()
print "elapsed time: ", (end-start)/60
In [60]:
import time
start = time.clock()
test_54.q_diagram()
end = time.clock()
print "elapsed time: ", (end - start)/60
In [61]:
test_54.sigma_diagram()
In [62]:
test_54.b_diagram()
In [63]:
states_54 = epsilon_field(rule_54.get_spacetime())
In [64]:
states_54.estimate_states(3,3,1)
In [65]:
states_54.filter_data()
In [66]:
states_54.complexity_diagram(10)
In [ ]:
In [86]:
pres = str(test_54.present()[20,20])
past = test_54.past()[20,20]
prob = test_54.probabilities()
In [87]:
print prob[past]
print prob[pres]
In [88]:
print prob[test_54.past_present()[20,20]]/prob[past]
In [90]:
print np.shape(test_54.present())
print np.shape(test_54.past())
print np.shape(test_54.future())
In [100]:
rows, columns = np.shape(test_54.present())
check = []
for i in xrange(rows):
for j in xrange(columns):
check.append(prob[test_54.past_present()[i,j]] / prob[test_54.past()[i,j]])
In [102]:
print np.unique(check,False,False,True)
In [22]:
pasts = np.unique(test_54.past())
In [23]:
Z = 0
for past in pasts:
Z += test_54.past_probs()[str(past)]
print Z
In [ ]:
In [19]:
p_test = {}
states, counts = np.unique(test_54.past(),False,False,True)
test_Z = np.sum(counts)
for i, count in enumerate(counts):
p_test.update({states[i]:count/test_Z})
In [20]:
Z = 0
for past in pasts:
Z += p_test[past]
print Z
In [ ]:
In [27]:
pf_prob = test_54.present_future_probs()
p_prob = test_54.present_probs()
rows, columns = np.shape(test_54.present())
check = []
uniques = []
for i in xrange(rows):
for j in xrange(columns):
if test_54.present()[i,j]==0 and test_54.present_future()[i,j] not in uniques:
uniques.append(test_54.present_future()[i,j])
check.append(pf_prob[test_54.present_future()[i,j]] / p_prob[str(test_54.present()[i,j])])
In [29]:
print np.sum(check)
In [67]:
x = [1,2,3,4,5]
print x[::-1]
In [68]:
y = [0,0,0,0,0]
z = np.vstack((x,y))
print z
print z[::-1]
In [71]:
a = np.array([[1,1,1,1],[2,2,2,2],[3,3,3,3],[4,4,4,4]])
In [72]:
print a
In [73]:
print a[::-1]
In [134]:
class local_excess_entropy(object):
'''
Object that handles forward and reverse time epsilon fields, gathers statistics over forward and reverse states,
and calculates the field of local excess entropy values
'''
def __init__(self, process, transient_time):
'''
Transient time should be larger than past lightcone depth.
'''
self._original_process = process
self._transient = transient_time
self._forward_process = np.copy(process[transient_time:])
self._reverse_process = np.copy(self._forward_process[::-1])
self._time, self._space = np.shape(self._forward_process)
#state estimation parameters
self._c = None
self._lightcone_depth = None
self._A = None
self._alpha = None
self._forward_states = None
self._reverse_states = None
self._forward_dist = None
self._reverse_dist = None
self._joint_dist = {}
def estimate_states(self, lightcone_depth, propogation_speed, alphabet_size = None, alpha = 0.05):
'''
'''
#from raw spacetime data of process, cut out an initial transient time and the future light cone horizon
#at the end of the process. This is the forward process, invert time (vertical axis) and get reverse process
#original_time = np.shape(self._original_process)[0]
#self._forward_process = np.copy(self._original_process[self._transient : original_time - future_depth])
#fill in state estimation parameters
self._c = propogation_speed
self._lightcone_depth = lightcone_depth
self._alpha = alpha
if alphabet_size is None:
self._A = np.size(np.unique(self._original_process))
else:
self._A = alphabet_size
#create epsilon fields for forward and reverse process, and estimate the forward and reverse states using
#the state estimation parameters
self._forward_field = epsilon_field(self._forward_process)
self._reverse_field = epsilon_field(self._reverse_process)
self._forward_field.estimate_states(lightcone_depth, lightcone_depth, propogation_speed, alphabet_size, alpha)
self._reverse_field.estimate_states(lightcone_depth, lightcone_depth, propogation_speed, alphabet_size, alpha)
def forward_state_field(self):
'''
'''
if self._forward_states is None:
self._forward_field.filter_data()
self._forward_states = self._forward_field.get_causal_field()[self._lightcone_depth : self._time-self._lightcone_depth]
return self._forward_states
def reverse_state_field(self):
'''
'''
if self._reverse_states is None:
self._reverse_field.filter_data()
self._reverse_states = self._reverse_field.get_causal_field()[self._lightcone_depth : self._time-self._lightcone_depth]
return self._reverse_states
def forward_state_dist(self):
'''
'''
if self._forward_dist is None:
states, counts = np.unique(self.forward_state_field(), False, False, True)
hist = np.zeros(max(states)+1)
for state, count in zip(states,counts):
hist[state] = count
self._forward_dist = hist / np.sum(hist)
return self._forward_dist
def reverse_state_dist(self):
'''
'''
if self._reverse_dist is None:
states, counts = np.unique(self.reverse_state_field(), False, False, True)
hist = np.zeros(max(states)+1)
for state, count in zip(states,counts):
hist[state] = count
self._reverse_dist = hist/np.sum(hist)
return self._reverse_dist
def joint_state_field(self):
'''
'''
forward = self.forward_state_field()
reverse = self.reverse_state_field()[::-1]
time = np.shape(forward)[0]
joint_field = []
for i in xrange(time):
joint_field += [zip(forward[i],reverse[i])]
return joint_field
def joint_state_dist(self):
'''
Mapping from state joints (forward state, reverse state) to their probabilities.
'''
if len(self._joint_dist) == 0:
#flatten joint_state_field so that it can be run through Counter object
stats = []
for i in self.joint_state_field():
stats += i
#run through Counter
counts = Counter(stats).items()
#normalization
Z = len(stats)
#use counts to fill out distribution
for i,j in counts:
self._joint_dist.update({i : j/Z})
return self._joint_dist
def excess_entropy(self):
'''
Returns excess entropy for the spacetime process
'''
E=0
for item in self.joint_state_dist().items():
joint = item[0]
prob = item[1]
E += prob * np.log(prob /(self.forward_state_dist()[joint[0]] * self.reverse_state_dist()[joint[1]]))
return E / np.log(2)
def excess_entropy_field(self):
'''
Returns field of local (pointwise) excess entropy values. Here, excess entropy is calculated as the mutual information
between forward and reverse local causal states.
'''
time, space = np.shape(self.forward_state_field())
e_field = np.zeros((time, space))
for i in xrange(time):
for j in xrange(space):
joint = self.joint_state_field()[i][j]
e_field[i][j] = np.log(self.joint_state_dist()[joint]/ \
(self.forward_state_dist()[joint[0]] * self.reverse_state_dist()[joint[1]])) / np.log(2)
return e_field
In [73]:
test_field = np.zeros((10,10), dtype = int)
fill = np.ones((2,10),dtype = int)
test_field = np.vstack((fill,test_field,fill))
time, space = np.shape(test_field)
test_field[2]+=2
test_field[11]+=2
print test_field
In [135]:
ee_test = local_excess_entropy(test_field, 0)
In [136]:
ee_test.estimate_states(2,1)
In [137]:
print ee_test.forward_state_field()
In [138]:
print ee_test.reverse_state_field()
In [139]:
print np.shape(ee_test.forward_state_field())
print np.shape(ee_test.reverse_state_field())
In [140]:
print ee_test.forward_state_field()
In [141]:
print ee_test.reverse_state_field()[::-1]
In [142]:
for i in xrange(10):
print ee_test.joint_state_field()[i]
In [151]:
dist_test = ee_test.joint_state_dist()
for item in dist_test.items():
print item
In [144]:
print ee_test.excess_entropy()
In [145]:
print ee_test.excess_entropy_field()
In [146]:
print np.shape(ee_test.excess_entropy_field())
In [147]:
print np.unique(ee_test.excess_entropy_field())
In [148]:
print np.mean(ee_test.excess_entropy_field())
In [149]:
print ee_test.excess_entropy()
In [ ]: